[SPARK-56687][SQL] Support netChanges for DSv2 CDC streaming reads#55637
[SPARK-56687][SQL] Support netChanges for DSv2 CDC streaming reads#55637gengliangwang wants to merge 10 commits intoapache:masterfrom
Conversation
Implements carry-over removal and update detection for DSv2 CDC streaming reads, which previously rejected any post-processing with a blanket error. The batch path uses a Catalyst Window keyed by (rowId, _commit_version), which UnsupportedOperationChecker rejects on streaming queries (NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING). The streaming rewrite expresses the same logic with streaming-allowed primitives: EventTimeWatermark on _commit_timestamp -> Aggregate keyed by (rowId..., _commit_version, _commit_timestamp) buffering events into a collect_list of structs -> [Filter on the carry-over predicate] -> Generate(Inline(events)) to re-emit rows -> [Project relabeling _change_type for delete+insert pairs] -> drop helper columns. deduplicationMode=netChanges remains batch-only; it requires partitioning by rowId across the entire requested range and is fundamentally cross-batch. The existing INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED error is replaced with the more specific INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED, which now also points users at the supported streaming alternatives. Also clarifies the Changelog.java contract that all rows of a single _commit_version must share _commit_timestamp and that streaming reads expect non-decreasing _commit_timestamp across micro-batches, plus a note in DataStreamReader.changes() Scaladoc about the netChanges streaming limitation.
|
@huaxingao FYI this is part of the SPIP for CDC support (SPARK-55668), targeting the Spark 4.2 release. We're aiming to get it ready and merged ASAP. |
…date netChanges test
…ams and runtime walkthrough
c5bc9bf to
4d48424
Compare
…l rewrite Three fixes from viirya's review on apache#55636: 1. Strip the auto-injected EventTimeWatermark metadata from the user-visible `_commit_timestamp` output. The metadata flowed through `Generate(Inline)` onto the public output, where it would have interacted with downstream user-supplied watermarks via the global multi-watermark policy. A final Project at the boundary of the rewrite now removes `EventTimeWatermark.delayKey` so the watermark stays internal-only. 2. Reject non-Append output modes for streaming CDC reads with post-processing. The injected streaming Aggregate's append-mode emission (one group per `_commit_timestamp` once the watermark advances past it) is the only semantically valid mode -- Update would re-emit per-batch state changes, Complete would re-emit the full result table per batch, neither matching batch CDC semantics. UnsupportedOperationChecker now detects the rewrite by the `__spark_cdc_events` helper aggregate expression and throws STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION for non-Append modes. 3. Tighten the `_commit_timestamp` streaming contract in `Changelog.java`. The previous "non-decreasing across micro-batches" wording was too weak: Spark's stateful aggregate evicts groups with `eventTime <= watermark` (statefulOperators.scala:643-650), so equal-timestamp rows in a later micro-batch would be dropped as late. The contract now requires that all rows of a single commit appear in the same micro-batch -- the natural atomic-commit emission pattern of real CDC connectors (Delta versions, Iceberg snapshots) -- which makes the zero-delay watermark sound. Adds a plan-shape test asserting no watermark metadata leaks to user-visible output, and two end-to-end negative tests covering Update / Complete output mode rejection.
…treaming CDC Address the second sub-case from viirya's review on apache#55636. The previous contract change covered the same-commit-split-across-micro-batches case via "all rows of a single commit must appear in the same micro-batch", but missed the case where two DIFFERENT commits with the same `_commit_timestamp` arrive in different micro-batches. Spark's late-event filter and state-eviction predicate both use `LessThanOrEqual` (`statefulOperators.scala:633-651`), so once a micro-batch observes max event time T and advances the watermark to T, any later row at exactly `_commit_timestamp = T` is silently dropped as late. The atomic-microbatch contract alone doesn't rule this out for distinct commits. Adds a second contract requirement: distinct `_commit_version` values must have distinct `_commit_timestamp` values when streaming post-processing is enabled. Atomic-commit CDC connectors that derive `_commit_timestamp` from wall-clock time at commit time (Delta, Iceberg) naturally satisfy this. Doc-only change; no code modifications. The existing tests already exercise the supported cases; the unsupported case 2 is by definition a connector contract violation, so we don't add a test for it.
| * 6. Final [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*` helpers so | ||
| * the output schema matches the connector's declared schema. | ||
| */ | ||
| private def addStreamingRowLevelPostProcessing( |
There was a problem hiding this comment.
Should we enforce non-null commit timestamp similar to CdcNetChangesStatefulProcessor.scala?
There was a problem hiding this comment.
Good call -- added a matching analyzer-level guard for the row-level path. The streaming row-level rewrite now starts with a Filter that raises CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP (via RaiseError) on any row with a NULL _commit_timestamp, mirroring the runtime guard in CdcNetChangesStatefulProcessor.
A NULL _commit_timestamp would silently stall the row-level path (the downstream Aggregate uses _commit_timestamp as both the watermark column and a grouping key, and the eventTime <= watermark eviction predicate is never satisfied for a NULL key, so the group sits in state forever producing no output and no error). Failing fast at the analyzer level surfaces the contract violation immediately.
Lives on the row-level PR (#55636) since the method being reviewed is in that PR; this branch picks it up via rebase. Plan-shape and end-to-end tests added on #55636 too. Pushed in 791d5ce3246 (row-level PR), now visible on this PR after the rebase.
…ermark-strip Two follow-ups on the streaming CDC row-level rewrite: 1. `dev/lint-scala` runs scalafmt on `sql/api`; my prior edit to `DataStreamReader.changes()` left the Scaladoc lines wrapped at the wrong column. Re-flowed via `./build/mvn scalafmt:format -pl sql/api`. 2. Updated the user-visible Scaladoc on `DataStreamReader.changes()` to reflect the watermark-metadata strip from dee5e84. The previous wording said "the watermark metadata is preserved on the user-visible `_commit_timestamp` output ... global watermark becomes the min of the two" -- that was accurate before the strip, but is now stale. The new wording says the metadata is stripped (so downstream user-supplied watermarks do not interact with it via the global multi-watermark policy) and explicitly notes that streaming row-level post-processing constrains the query to Append output mode. Note: the Java unidoc CI step is failing on an unrelated pre-existing name-clash error in `core/target/java/.../JavaSparkContext.java:415` (`<K,V>union(Seq<JavaPairRDD<K,V>>)` vs `<T>union(Seq<JavaRDD<T>>)` -- same erasure). Verified identical to upstream master, so it's not from this PR.
…treaming row-level rewrite Address @zikangh's review on apache#55637 -- the streaming row-level rewrite should enforce non-NULL _commit_timestamp, mirroring the runtime guard in CdcNetChangesStatefulProcessor. A NULL _commit_timestamp on a streaming read is a connector contract violation that would silently stall the row's group: the downstream streaming Aggregate uses _commit_timestamp as an event-time watermark column AND a grouping key, and Spark's eviction predicate is LessThanOrEqual(eventTime, watermark) -- a NULL group key never satisfies that, so the group sits in state until end of stream producing no output and no error. Add a Filter at the top of the streaming row-level rewrite that raises CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP via the same RaiseError pattern used for the multiple-changes-per-row-version guard in the batch path. Also adds the new error class to error-conditions.json. Tests: - Plan-shape tests: assert the guard Filter is present and sits directly above the streaming relation (so it runs before any downstream operator sees the NULL). - End-to-end test: feeding a row with a NULL _commit_timestamp surfaces CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP at the streaming query level rather than producing no output. - Existing carry-over / update-detection plan-shape tests updated for the extra guard Filter (was 1 -> now 2 Filters in carry-over and combined paths; was 0 -> now 1 in update-detection-only). Also refreshed the addStreamingRowLevelPostProcessing Scaladoc to add a step 0 (the guard) and step 7 (the watermark-metadata strip), keeping the per-operator detail aligned with the rewrite's actual shape. Doc-only side effect: scalafmt reflowed the watermark-metadata bullet in DataStreamReader.changes() Scaladoc (no semantic change).
Implements `deduplicationMode = netChanges` for DSv2 CDC streaming reads,
completing the streaming post-processing surface. The previous PR
(SPARK-56686) added carry-over removal and update detection for streaming
but kept netChanges batch-only.
The batch path uses a Catalyst Window partitioned by `rowId` and ordered
by `(_commit_version, change_type_rank)` to find the first/last events
per row identity, then applies the SPIP collapse matrix on
`(existedBefore, existsAfter)`. Window is rejected on streaming children.
This PR adds a streaming-friendly equivalent by delegating per-row-identity
state management to a new CdcNetChangesStatefulProcessor driven by
TransformWithState. The processor stores the first event ever observed and
the most-recent event observed for each rowId, and arms an event-time timer
on the latest `_commit_timestamp` for the key. When the global watermark
advances past the timer, handleExpiredTimer evaluates the SPIP matrix and
emits 0, 1, or 2 output rows -- identical semantics to the batch path.
Plan shape (streaming netChanges):
EventTimeWatermark(_commit_timestamp, 0s)
-> Project (alias rowId expressions to flat helper columns)
-> TransformWithState (grouping=rowId helpers, EventTime mode, Append)
-> SerializeFromObject
-> Project (drop rowId helper columns)
When carry-over removal / update detection are also requested, the
row-level rewrite is applied first; the netChanges TransformWithState then
sits on top, sharing the same EventTimeWatermark (it's reused rather than
stacked, which would be rejected by the multi-watermark check).
Documented limitation: row identities only touched in the latest observed
commit are held back until a later commit (with strictly greater
`_commit_timestamp`) advances the watermark past them, or the source
terminates. End-of-input flushes all timers, so bounded streams produce
output equivalent to the corresponding batch read.
Removes the now-obsolete error class
INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED that was introduced
by the previous PR; updates the DataStreamReader.changes() and
Changelog.java Scaladoc to describe the new behavior and the latest-commit
limitation. Tests in ChangelogResolutionSuite,
ResolveChangelogTablePostProcessingSuite, and the new plan-shape suite
flip from "expect throw" to "expect TransformWithState in plan", and
ChangelogEndToEndSuite gains two streaming netChanges end-to-end tests
covering the SPIP matrix's cancel and persist-via-update cases.
…d test fixes
- Tighten Changelog.java to require LONG for _commit_version (post-processing
compares versions as primitive longs); update the NULL _commit_timestamp
clause to point to the new error class.
- CdcNetChangesStatefulProcessor: replace the internalError-with-string-prefix
pattern with structured CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE
(matches batch path) and add a NULL _commit_timestamp guard raising
CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP rather than NPE'ing on
getTime().
- Hoist `inputSchema.fieldIndex("_change_type")` out of `relabel` to avoid a
linear scan per emitted row.
- Expand `addStreamingNetChangeComputation` Scaladoc Step 1 to describe the
watermark reuse for combined row-level + netChanges paths.
- ResolveChangelogTableStreamingPostProcessingSuite: assert the
TransformWithState grouping attributes (so a regression that grouped by the
wrong attributes can't pass the size check alone).
- ChangelogEndToEndSuite "labels persisting rows as updates": rewrite the
fixture so the row identity actually exercises (existedBefore=true,
existsAfter=true) and the (true,true) cell of the SPIP matrix is asserted to
emit update_preimage + update_postimage; the previous fixture started with
an INSERT so it fell into the (false,true) cell and emitted INSERT.
- DataStreamReader.changes(): broaden the post-processing "Two implications
follow" intro from row-level-only to all post-processing modes so the
netChanges path is covered.
Co-authored-by: Isaac
…rowId - ChangelogEndToEndSuite: 3 new e2e tests covering missing SPIP matrix cells ((true, false) DELETE, (true, true) without computeUpdates), and combined netChanges + carry-over removal post-processing path. - ResolveChangelogTableStreamingPostProcessingSuite: assert grouping attribute on netChanges-alone test; add composite rowId plan-shape test. - Remove stale "deduplicationMode=netChanges is rejected on streaming" test (netChanges is now supported on streaming) and the now-unused AnalysisException import. Co-authored-by: Isaac
e2f65a2 to
e31cd2b
Compare
| Iterator.empty | ||
| } | ||
|
|
||
| override def handleExpiredTimer( |
There was a problem hiding this comment.
handleExpiredTimer emits the current net result and then clears both first/last state. In a long-running stream, the same rowId can receive later commits after an unrelated commit advances the watermark. Example: id=1 insert at v1, id=2 change at v2 advances watermark and emits id=1 insert, then id=1 delete at v3 emits a later delete. Batch netChanges over v1..v3 would cancel to no row, so this is not equivalent to the documented range-scoped netChanges semantics. The tests add all rows before starting the query, so they do not cover a later event arriving after an earlier timer fired.
| timerValues: TimerValues): Iterator[Row] = { | ||
| val handle = getHandle | ||
| val sorted = inputRows.toSeq.sortBy { row => | ||
| val v = row.getAs[Long]("_commit_version") |
There was a problem hiding this comment.
The changelog contract says _commit_version is connector-defined, while the batch path sorts the attribute generically through Catalyst. This processor hard-casts _commit_version with getAs[Long], so any connector using another orderable version type will fail at runtime or be unsupported only on the streaming path. The plan should either preserve Catalyst ordering/ranking before entering the processor, or explicitly validate/restrict the version type.
| * .changes("my_table") | ||
| * }}} | ||
| * | ||
| * Streaming reads support all of the same post-processing as batch reads -- `computeUpdates`, |
There was a problem hiding this comment.
The PR body notes that TransformWithState requires RocksDB, but the changes() Scaladoc says streaming supports all post-processing without mentioning that the default state store will fail at query start. Since this is an analyzer-injected operator rather than user-written transformWithState, users have little reason to know they must set the RocksDB provider.
What changes were proposed in this pull request?
This PR completes the DSv2 CDC streaming post-processing surface by implementing
deduplicationMode = netChangesfor streaming reads. The previous PR (#55636 / SPARK-56686) added carry-over removal and update detection for streaming but left netChanges batch-only.The batch path (
ResolveChangelogTable.injectNetChangeComputation) uses a CatalystWindowpartitioned byrowIdand ordered by(_commit_version, change_type_rank)to find the first and last events per row identity, then applies the SPIP collapse matrix on(existedBefore, existsAfter).Windowis rejected on streaming children (NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING), and unlike the row-level passes the netChanges aggregate is keyed byrowIdonly -- there's no commit-version + commit-timestamp grouping that would let us reuse the streaming Aggregate pattern.This PR adds a streaming-friendly equivalent by delegating per-row-identity state management to a new
CdcNetChangesStatefulProcessordriven byTransformWithState:ValueState[Row]._commit_timestampobserved for the key. When the global watermark advances past the timer,handleExpiredTimerevaluates the SPIP matrix and emits 0, 1, or 2 output rows -- identical semantics to the batch path.The analyzer rule constructs
TransformWithStatedirectly (no precedent in catalyst for this; the typed-Dataset DSL is the usual entry point). Encoders for the input/outputRowand the rowId tuple are built viaExpressionEncoder(StructType). Nested rowId paths (e.g.payload.id) are handled by aliasing each rowId expression to a top-level__spark_cdc_rowid_<i>helper column before theTransformWithState, then dropping the helpers in a finalProjectso the user-visible schema matches the connector's declared changelog schema.Plan shape:
When carry-over removal / update detection are also requested, the row-level rewrite is applied first; the netChanges
TransformWithStatethen sits on top of it and the rule reuses the existingEventTimeWatermarkrather than stacking another (multi-watermark stacking is rejected unlessSTATEFUL_OPERATOR_ALLOW_MULTIPLEis set).Documented limitation
Row identities only touched in the latest observed commit are held back until a later commit (with strictly greater
_commit_timestamp) advances the watermark past them, or the source terminates. End-of-input flushes all timers, so bounded streams produce output equivalent to the corresponding batch read. This matches the steady-state behavior of the row-level streaming rewrite.Also removes the now-obsolete error class
INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTEDintroduced in SPARK-56686.Why are the changes needed?
Without this PR,
deduplicationMode = netChangesis unavailable on streaming CDC reads, forcing users with intermediate-state connectors (containsIntermediateChanges = true) to fall back to batch reads when they want a deduplicated change feed. With SPARK-56686 already shipping carry-over removal and update detection for streaming, netChanges was the only post-processing pass still gated to batch -- this completes the surface.Does this PR introduce any user-facing change?
Yes.
spark.readStream.changes(...)now supportsdeduplicationMode = netChanges. Previously this threwINVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED.DataStreamReader.changes()andChangelog.javaScaladoc has been updated to describe the supported behavior and the latest-commit limitation.Note: the netChanges streaming path uses
TransformWithState, which requires the RocksDB state store backend (spark.sql.streaming.stateStore.providerClass = ...RocksDBStateStoreProvider). Spark surfacesUNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWSif the default HDFS-backed provider is left in place, so this is discoverable.How was this patch tested?
89 tests pass across 4 CDC suites (all green):
ResolveChangelogTableStreamingPostProcessingSuite-- two new plan-shape tests:netChanges alone injects watermark + TransformWithStateandnetChanges + carry-over removal share a single watermark(verifies that the netChangesTransformWithStatereuses the row-level rewrite'sEventTimeWatermarkrather than stacking another).ChangelogResolutionSuite-- thenetChanges throwstest from SPARK-56686 is flipped to assert that exactly oneTransformWithStateappears in the analyzed plan.ResolveChangelogTablePostProcessingSuite-- the corresponding netChanges throw test is similarly flipped.ChangelogEndToEndSuite-- two new end-to-end tests that drive a streaming query againstInMemoryChangelogCatalogwith the RocksDB state store:streaming netChanges collapses INSERT then DELETE to no output(confirms the(false, false)cancel case and that end-of-input flushes the latest commit's group) andstreaming netChanges with computeUpdates labels persisting rows as updates(confirms the(false, true)case relabels correctly).Also confirmed
UnsupportedOperationsSuite(216 tests) still passes.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)